bitkeeper revision 1.1159.269.2 (4231ceccPlcoLKWWixfu3trU9tb-_A)
authormjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Fri, 11 Mar 2005 17:01:00 +0000 (17:01 +0000)
committermjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Fri, 11 Mar 2005 17:01:00 +0000 (17:01 +0000)
Improve error reporting for save/restore/migrate.

Signed-off-by: Mike Wray <mike.wray@hp.com>
tools/python/xen/xend/XendMigrate.py
tools/python/xen/xend/server/SrvDomain.py

index a1c261deb5dfe2b542ab64e57c6d4523532181f8..f2570ae02b7d892328f9a24c3f710afe6933fe87 100644 (file)
@@ -6,12 +6,14 @@ import errno
 import sys
 import socket
 import time
+import types
 
 from twisted.internet import reactor
 from twisted.internet import defer
 #defer.Deferred.debug = 1
 from twisted.internet.protocol import Protocol
 from twisted.internet.protocol import ClientFactory
+from twisted.python.failure import Failure
 
 import sxp
 import XendDB
@@ -45,11 +47,9 @@ class Xfrd(Protocol):
         sxp.show(req, out=self.transport)
 
     def loseConnection(self):
-        print 'Xfrd>loseConnection>'
         self.transport.loseConnection()
 
     def connectionLost(self, reason):
-        print 'Xfrd>connectionLost>', reason
         self.xinfo.connectionLost(reason)
 
     def dataReceived(self, data):
@@ -70,17 +70,15 @@ class XfrdClientFactory(ClientFactory):
         self.xinfo = xinfo
 
     def startedConnecting(self, connector):
-        print 'Started to connect', 'self=', self, 'connector=', connector
+        pass
 
     def buildProtocol(self, addr):
-        print 'buildProtocol>', addr
         return Xfrd(self.xinfo)
 
     def clientConnectionLost(self, connector, reason):
-        print 'clientConnectionLost>', 'connector=', connector, 'reason=', reason
+        pass
 
     def clientConnectionFailed(self, connector, reason):
-        print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason
         self.xinfo.error(reason)
 
 class XfrdInfo:
@@ -90,7 +88,7 @@ class XfrdInfo:
 
     """Suspend timeout (seconds).
     We set a timeout because suspending a domain can hang."""
-    timeout = 30
+    timeout = 10
 
     def __init__(self):
         from xen.xend import XendDomain
@@ -98,6 +96,9 @@ class XfrdInfo:
         self.deferred = defer.Deferred()
         self.suspended = {}
         self.paused = {}
+        self.state = 'init'
+        # List of errors encountered.
+        self.errors = []
         
     def vmconfig(self):
         dominfo = self.xd.domain_get(self.src_dom)
@@ -107,12 +108,38 @@ class XfrdInfo:
             val = None
         return val
 
+    def add_error(self, err):
+        """Add an error to the error list.
+        Returns the error added (which may have been unwrapped if it
+        was a Twisted Failure).
+        """
+        while isinstance(err, Failure):
+            err = err.value
+        if err not in self.errors:
+            self.errors.append(err)
+        return err
+
+    def error_summary(self, msg=None):
+        """Get a XendError summarising the errors (if any).
+        """
+        if msg is None:
+            msg = "errors"
+        if self.errors:
+            errmsg = msg + ': ' + ', '.join(map(str, self.errors))
+        else:
+            errmsg = msg
+        return XendError(errmsg)
+
+    def get_errors(self):
+        """Get the list of errors.
+        """
+        return self.errors
+
     def error(self, err):
-        print 'Error>', err
         self.state = 'error'
+        self.add_error(err)
         if not self.deferred.called:
-            print 'Error> calling errback'
-            self.deferred.errback(err)
+            self.deferred.errback(self.error_summary())
 
     def dispatch(self, xfrd, val):
         
@@ -139,28 +166,23 @@ class XfrdInfo:
             cbok(val)
 
     def unknown(self, xfrd, val):
-        print 'unknown>', val
         xfrd.loseConnection()
         return None
 
     def xfr_err(self, xfrd, val):
         # If we get an error with non-zero code the operation failed.
         # An error with code zero indicates hello success.
-        print 'xfr_err>', val
         v = sxp.child0(val)
-        print 'xfr_err>', type(v), v
         err = int(sxp.child0(val))
         if not err: return
-        self.error(err);
+        self.error("transfer daemon (xfrd) error: " + str(err))
         xfrd.loseConnection()
         return None
 
     def xfr_progress(self, xfrd, val):
-        print 'xfr_progress>', val
         return None
 
     def xfr_vm_destroy(self, xfrd, val):
-        print 'xfr_vm_destroy>', val
         try:
             vmid = sxp.child0(val)
             val = self.xd.domain_destroy(vmid)
@@ -168,28 +190,32 @@ class XfrdInfo:
                 del self.paused[vmid]
             if vmid in self.suspended:
                 del self.suspended[vmid]
-        except:
+        except StandardError, err:
+            self.add_error("vm_destroy failed")
+            self.add_error(err)
             val = errno.EINVAL
         return ['xfr.err', val]
     
     def xfr_vm_pause(self, xfrd, val):
-        print 'xfr_vm_pause>', val
         try:
             vmid = sxp.child0(val)
             val = self.xd.domain_pause(vmid)
             self.paused[vmid] = 1
-        except:
+        except StandardError, err:
+            self.add_error("vm_pause failed")
+            self.add_error(err)
             val = errno.EINVAL
         return ['xfr.err', val]
 
     def xfr_vm_unpause(self, xfrd, val):
-        print 'xfr_vm_unpause>', val
         try:
             vmid = sxp.child0(val)
             val = self.xd.domain_unpause(vmid)
             if vmid in self.paused:
                 del self.paused[vmid]
-        except:
+        except StandardError, err:
+            self.add_error("vm_unpause failed")
+            self.add_error(err)
             val = errno.EINVAL
         return ['xfr.err', val]
 
@@ -199,7 +225,6 @@ class XfrdInfo:
         Suspending can hang, so we set a timeout and fail if it
         takes too long.
         """
-        print 'xfr_vm_suspend>', val
         try:
             vmid = sxp.child0(val)
             d = defer.Deferred()
@@ -208,15 +233,15 @@ class XfrdInfo:
             # the domain died. Set a timeout and error handler so the subscriptions
             # will be cleaned up if suspending hangs or there is an error.
             def onSuspended(e, v):
-                print 'xfr_vm_suspend>onSuspended>', e, v
                 if v[1] != vmid: return
                 subscribe(on=0)
-                d.callback(v)
+                if not d.called:
+                    d.callback(v)
                 
             def onDied(e, v):
-                print 'xfr_vm_suspend>onDied>', e, v
                 if v[1] != vmid: return
-                d.errback(XendError('Domain died'))
+                if not d.called:
+                    d.errback(XendError('Domain %s died while suspending' % vmid))
                 
             def subscribe(on=1):
                 if on:
@@ -227,24 +252,25 @@ class XfrdInfo:
                 action('xend.domain.died', onDied)
 
             def cberr(err):
-                print 'xfr_vm_suspend>cberr>', err
                 subscribe(on=0)
+                self.add_error("suspend failed")
+                self.add_error(err)
                 return err
 
+            d.addErrback(cberr)
+            d.setTimeout(self.timeout)
             subscribe()
             val = self.xd.domain_shutdown(vmid, reason='suspend')
             self.suspended[vmid] = 1
-            d.addErrback(cberr)
-            d.setTimeout(self.timeout)
             return d
         except Exception, err:
-            print 'xfr_vm_suspend> Exception', err
+            self.add_error("suspend failed")
+            self.add_error(err)
             traceback.print_exc()
             val = errno.EINVAL
         return ['xfr.err', val]
 
     def connectionLost(self, reason=None):
-        print 'XfrdInfo>connectionLost>', reason
         for vmid in self.suspended:
             try:
                 self.xd.domain_destroy(vmid)
@@ -279,7 +305,7 @@ class XendMigrateInfo(XfrdInfo):
                 ['id',    self.xid   ],
                 ['state', self.state ],
                 ['live',  self.live  ],
-                ['resource', self.resource] ]
+                ['resource', self.resource ] ]
         sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ]
         sxpr.append(sxpr_src)
         sxpr_dst = ['dst', ['host', self.dst_host] ]
@@ -291,12 +317,12 @@ class XendMigrateInfo(XfrdInfo):
     def request(self, xfrd):
         vmconfig = self.vmconfig()
         if not vmconfig:
+            self.error(XendError("vm config not found"))
             xfrd.loseConnection()
             return
-        log.info('Migrate BEGIN: ' + str(self.sxpr()))
+        log.info('Migrate BEGIN: %s' % str(self.sxpr()))
         eserver.inject('xend.domain.migrate',
-                       [ self.dominfo.name, self.dominfo.id,
-                         "begin", self.sxpr() ])
+                       [ self.dominfo.name, self.dominfo.id, "begin", self.sxpr() ])
         xfrd.request(['xfr.migrate',
                       self.src_dom,
                       vmconfig,
@@ -305,19 +331,6 @@ class XendMigrateInfo(XfrdInfo):
                       self.live,
                       self.resource ])
         
-##     def xfr_vm_suspend(self, xfrd, val):
-##         def cbok(val):
-##             # Special case for localhost: destroy devices early.
-##             if self.dst_host in ["localhost", "127.0.0.1"]:
-##                 self.dominfo.restart_cancel()
-##                 self.dominfo.cleanup()
-##                 self.dominfo.destroy_console()
-##             return val
-            
-##         d = XfrdInfo.xfr_vm_suspend(self, xfrd, val)
-##         d.addCallback(cbok)
-##         return d
-    
     def xfr_migrate_ok(self, xfrd, val):
         dom = int(sxp.child0(val))
         self.state = 'ok'
@@ -327,17 +340,15 @@ class XendMigrateInfo(XfrdInfo):
             self.deferred.callback(self)
 
     def connectionLost(self, reason=None):
-        print 'XfrdMigrateInfo>connectionLost>', reason
         XfrdInfo.connectionLost(self, reason)
         if self.state =='ok':
             log.info('Migrate OK: ' + str(self.sxpr()))
         else:
             self.state = 'error'
-            self.error(XendError("migrate failed"))
+            self.error("migrate failed")
             log.info('Migrate ERROR: ' + str(self.sxpr()))
         eserver.inject('xend.domain.migrate',
-                       [ self.dominfo.name, self.dominfo.id,
-                         self.state, self.sxpr() ])
+                       [ self.dominfo.name, self.dominfo.id, self.state, self.sxpr() ])
 
 class XendSaveInfo(XfrdInfo):
     """Representation of a save in-progress and its interaction with xfrd.
@@ -361,16 +372,15 @@ class XendSaveInfo(XfrdInfo):
         return sxpr
 
     def request(self, xfrd):
-        print '***request>', self.vmconfig()
         vmconfig = self.vmconfig()
         if not vmconfig:
+            self.error(XendError("vm config not found"))
             xfrd.loseConnection()
             return
-        print '***request> begin'
         log.info('Save BEGIN: ' + str(self.sxpr()))
         eserver.inject('xend.domain.save',
-                       [self.dominfo.name, self.dominfo.id,
-                        "begin", self.sxpr()])
+                       [ self.dominfo.name, self.dominfo.id,
+                         "begin", self.sxpr() ])
         xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ])
         
     def xfr_save_ok(self, xfrd, val):
@@ -380,13 +390,12 @@ class XendSaveInfo(XfrdInfo):
             self.deferred.callback(self)
 
     def connectionLost(self, reason=None):
-        print 'XfrdSaveInfo>connectionLost>', reason
         XfrdInfo.connectionLost(self, reason)
         if self.state =='ok':
             log.info('Save OK: ' + str(self.sxpr()))
         else:
             self.state = 'error'
-            self.error(XendError("save failed"))
+            self.error("save failed")
             log.info('Save ERROR: ' + str(self.sxpr()))
         eserver.inject('xend.domain.save',
                        [ self.dominfo.name, self.dominfo.id,
@@ -409,8 +418,9 @@ class XendRestoreInfo(XfrdInfo):
          return sxpr
 
     def request(self, xfrd):
-        print '***request>', self.file
         log.info('restore BEGIN: ' + str(self.sxpr()))
+        eserver.inject('xend.restore', [ 'begin', self.sxpr()])
+                       
         xfrd.request(['xfr.restore', self.file ])
         
     def xfr_restore_ok(self, xfrd, val):
@@ -419,8 +429,17 @@ class XendRestoreInfo(XfrdInfo):
         self.state = 'ok'
         if not self.deferred.called:
             self.deferred.callback(dominfo)
-
          
+    def connectionLost(self, reason=None):
+        XfrdInfo.connectionLost(self, reason)
+        if self.state =='ok':
+            log.info('Restore OK: ' + self.file)
+        else:
+            self.state = 'error'
+            self.error("restore failed")
+            log.info('Restore ERROR: ' + str(self.sxpr()))
+        eserver.inject('xend.restore', [ self.state,  self.sxpr()])
+
 class XendMigrate:
     """External api for interaction with xfrd for migrate and save.
     Singleton.
@@ -445,7 +464,6 @@ class XendMigrate:
         self.db.saveall("", self.session_db)
 
     def sync_session(self, xid):
-        print 'sync_session>', type(xid), xid, self.session_db[xid]
         self.db.save(xid, self.session_db[xid])
 
     def close(self):
@@ -458,7 +476,6 @@ class XendMigrate:
         self.sync_session(xid)
 
     def _delete_session(self, xid):
-        print '***_delete_session>', xid
         if xid in self.session:
             del self.session[xid]
         if xid in self.session_db:
@@ -482,16 +499,23 @@ class XendMigrate:
         @param info: session
         @return: deferred
         """
-        def cbremove(val):
-            print '***cbremove>', val
+        dfr = defer.Deferred()
+        def cbok(val):
             self._delete_session(info.xid)
+            if not dfr.called:
+                dfr.callback(val)
             return val
+        def cberr(err):
+            self._delete_session(info.xid)
+            if not dfr.called:
+                dfr.errback(err)
+            return err
         self._add_session(info)
-        info.deferred.addCallback(cbremove)
-        info.deferred.addErrback(cbremove)
+        info.deferred.addCallback(cbok)
+        info.deferred.addErrback(cberr)
         xcf = XfrdClientFactory(info)
         reactor.connectTCP('localhost', XFRD_PORT, xcf)
-        return info.deferred
+        return dfr
     
     def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0, resource=0):
         """Begin to migrate a domain to another host.
index d73a39bff51aa05ef41e12e83ee98fdedd06ef03..a4c06f6c8da7bc6b1fb785b61fb766128a18e0cc 100644 (file)
@@ -29,13 +29,8 @@ class SrvDomain(SrvDir):
                     [['dom', 'int'],
                      ['config', 'sxpr']])
         deferred = fn(req.args, {'dom': self.dom.dom})
-        deferred.addErrback(self._op_configure_err, req)
         return deferred
 
-    def _op_configure_err(self, err, req):
-        req.setResponseCode(http.BAD_REQUEST, "Error: "+ str(err))
-        return str(err)
-        
     def op_unpause(self, op, req):
         val = self.xd.domain_unpause(self.dom.name)
         return val
@@ -68,16 +63,11 @@ class SrvDomain(SrvDir):
                      ['file', 'str']])
         deferred = fn(req.args, {'dom': self.dom.id})
         deferred.addCallback(self._op_save_cb, req)
-        deferred.addErrback(self._op_save_err, req)
         return deferred
 
     def _op_save_cb(self, val, req):
         return 0
 
-    def _op_save_err(self, err, req):
-        req.setResponseCode(http.BAD_REQUEST, "Error: "+ str(err))
-        return str(err)
-        
     def op_migrate(self, op, req):
         fn = FormFn(self.xd.domain_migrate,
                     [['dom', 'str'],
@@ -85,9 +75,7 @@ class SrvDomain(SrvDir):
                      ['live', 'int'],
                      ['resource', 'int']])
         deferred = fn(req.args, {'dom': self.dom.id})
-        print 'op_migrate>', deferred
         deferred.addCallback(self._op_migrate_cb, req)
-        deferred.addErrback(self._op_migrate_err, req)
         return deferred
 
     def _op_migrate_cb(self, info, req):
@@ -101,11 +89,6 @@ class SrvDomain(SrvDir):
         print '_op_migrate_cb> url=', url
         return url
 
-    def _op_migrate_err(self, err, req):
-        print '_op_migrate_err>', err, req
-        req.setResponseCode(http.BAD_REQUEST, "Error: "+ str(err))
-        return str(err)
-
     def op_pincpu(self, op, req):
         fn = FormFn(self.xd.domain_pincpu,
                     [['dom', 'str'],